---
title: DASE Components Explained (Classification)
---

<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements.  See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License.  You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

<%= partial 'shared/dase/dase',
locals: {
  template_name: 'Classification Engine Template',
  evaluation_link: '/evaluation/paramtuning/'
} %>

## The Engine Design

As you can see from the Quick Start, *MyClassification* takes a JSON prediction
query, e.g. `{ "attr0":4, "attr1":3, "attr2":8 }`, and return a JSON predicted result.

WARNING: for version < v0.3.1, it is array of features values: `{ "features": [4, 3, 8] }`

In MyClassification/src/main/scala/***Engine.scala***, the `Query` case class
defines the format of **query**, such as `{ "attr0":4, "attr1":3, "attr2":8 }`:

```scala
case class Query(
  attr0 : Double,
  attr1 : Double,
  attr2 : Double
)

```

The `PredictedResult` case class defines the format of **predicted result**,
such as `{"label":2.0}`:

```scala
case class PredictedResult(
  val label: Double
)
```

Finally, `ClassificationEngine` is the Engine Factory that defines the
components this engine will use: Data Source, Data Preparator, Algorithm(s) and
Serving components.

```scala
object ClassificationEngine extends IEngineFactory {
  def apply() = {
    new Engine(
      classOf[DataSource],
      classOf[Preparator],
      Map("naive" -> classOf[NaiveBayesAlgorithm]),
      classOf[Serving])
  }
}
```

### Spark MLlib

Spark's MLlib NaiveBayes algorithm takes training data of RDD type, i.e.
`RDD[LabeledPoint]` and train a model, which is a `NaiveBayesModel` object.

PredictionIO's MLlib Classification engine template, which *MyClassification*
bases on, integrates this algorithm under the DASE architecture. We will take a
closer look at the DASE code below.
> [Check this out](https://spark.apache.org/docs/latest/mllib-naive-bayes.html)
to learn more about MLlib's NaiveBayes algorithm.

## Data

In the DASE architecture, data is prepared by 2 components sequentially: *Data
Source* and *Data Preparator*. *Data Source* and *Data Preparator* takes data
from the data store and prepares `RDD[LabeledPoint]` for the NaiveBayes
algorithm.

### Data Source

In MyClassification/src/main/scala/***DataSource.scala***, the `readTraining`
method of the class `DataSource` reads, and selects, data from datastore of
EventServer and it returns `TrainingData`.

```scala
case class DataSourceParams(appName: String) extends Params

class DataSource(val dsp: DataSourceParams)
  extends PDataSource[TrainingData, EmptyEvaluationInfo, Query, EmptyActualResult] {

  @transient lazy val logger = Logger[this.type]

  override
  def readTraining(sc: SparkContext): TrainingData = {

    val labeledPoints: RDD[LabeledPoint] = PEventStore.aggregateProperties(
      appName = dsp.appName,
      entityType = "user",
      // only keep entities with these required properties defined
      required = Some(List("plan", "attr0", "attr1", "attr2")))(sc)
      // aggregateProperties() returns RDD pair of
      // entity ID and its aggregated properties
      .map { case (entityId, properties) =>
        try {
          LabeledPoint(properties.get[Double]("plan"),
            Vectors.dense(Array(
              properties.get[Double]("attr0"),
              properties.get[Double]("attr1"),
              properties.get[Double]("attr2")
            ))
          )
        } catch {
          case e: Exception => {
            logger.error(s"Failed to get properties ${properties} of" +
              s" ${entityId}. Exception: ${e}.")
            throw e
          }
        }
      }.cache()

    new TrainingData(labeledPoints)
  }
}
```

`PEventStore` is an object which provides function to access data that is collected through the *Event Server*, and
`PEventStore.aggregateProperties` aggregates the event records of the 4 properties
(attr0, attr1, attr2 and plan) for each user.

PredictionIO automatically loads the parameters of *datasource* specified in
MyEngine/***engine.json***, including *appName*, to `dsp`.

In ***engine.json***:

```
{
  ...
  "datasource": {
    "params": {
      "appName": "MyApp1"
    }
  },
  ...
}
```

In this sample text data file, columns are delimited by comma (,). The first
column are labels. The second column are features.

The class definition of `TrainingData` is:

```scala
class TrainingData(
  val labeledPoints: RDD[LabeledPoint]
) extends Serializable
```
and PredictionIO passes the returned `TrainingData` object to *Data Preparator*.


### Data Preparator

In MyClassification/src/main/scala/***Preparator.scala***, the `prepare` of
class `Preparator` takes `TrainingData`. It then conducts any necessary feature
selection and data processing tasks. At the end, it returns `PreparedData` which
should contain the data *Algorithm* needs. For MLlib NaiveBayes, it is
`RDD[LabeledPoint]`.

By default, `prepare` simply copies the unprocessed `TrainingData` data to
`PreparedData`:

```scala
class PreparedData(
  val labeledPoints: RDD[LabeledPoint]
) extends Serializable

class Preparator
  extends PPreparator[TrainingData, PreparedData] {

  def prepare(sc: SparkContext, trainingData: TrainingData): PreparedData = {
    new PreparedData(trainingData.labeledPoints)
  }
}
```

PredictionIO passes the returned `PreparedData` object to Algorithm's `train`
function.

## Algorithm

In MyClassification/src/main/scala/***NaiveBayesAlgorithm.scala***, the two
methods of the algorithm class are `train` and `predict`. `train` is responsible
for training a predictive model. PredictionIO will store this model and
`predict` is responsible for using this model to make prediction.

### train(...)

`train` is called when you run **pio train**. This is where MLlib NaiveBayes
algorithm, i.e. `NaiveBayes.train`, is used to train a predictive model.

```scala
def train(sc: SparkContext, data: PreparedData): NaiveBayesModel = {
    NaiveBayes.train(data.labeledPoints, ap.lambda)
}
```

In addition to `RDD[LabeledPoint]` (i.e. `data.labeledPoints`),
`NaiveBayes.train` takes 1 parameter: *lambda*.

The values of this parameter is specified in *algorithms* of
MyClassification/***engine.json***:

```
{
  ...
  "algorithms": [
    {
      "name": "naive",
      "params": {
        "lambda": 1.0
      }
    }
  ]
  ...
}
```

PredictionIO will automatically loads these values into the constructor `ap`,
which has a corresponding case class `AlgorithmParams`:

```scala
case class AlgorithmParams(
  lambda: Double
) extends Params
```

`NaiveBayes.train` then returns a `NaiveBayesModel` model. PredictionIO will
automatically store the returned model.

### predict(...)

The `predict` method is called when you send a JSON query to
http://localhost:8000/queries.json. PredictionIO converts the query, such as `{ "attr0":4, "attr1":3, "attr2":8 }` to the `Query` class you defined previously.

The predictive model `NaiveBayesModel` of MLlib NaiveBayes offers a function
called `predict`. `predict` takes a dense vector of features. It predicts the
label of the item represented by this feature vector.

```scala
  def predict(model: NaiveBayesModel, query: Query): PredictedResult = {
    val label = model.predict(Vectors.dense(
    	query.attr0, query.attr1, query.attr2
    ))
    PredictedResult(label)
  }
```

> You have defined the class `PredictedResult` earlier in this page.

PredictionIO passes the returned `PredictedResult` object to *Serving*.

## Serving

The `serve` method of class `Serving` processes predicted result. It is also
responsible for combining multiple predicted results into one if you have more
than one predictive model. *Serving* then returns the final predicted result.
PredictionIO will convert it to a JSON response automatically.

In MyClassification/src/main/scala/***Serving.scala***,

```scala
class Serving
  extends LServing[Query, PredictedResult] {

  override
  def serve(query: Query,
    predictedResults: Seq[PredictedResult]): PredictedResult = {
    predictedResults.head
  }
}
```

When you send a JSON query to http://localhost:8000/queries.json,
`PredictedResult` from all models will be passed to `serve` as a sequence, i.e.
`Seq[PredictedResult]`.

> An engine can train multiple models if you specify more than one Algorithm
component in `object RecommendationEngine` inside ***Engine.scala***. Since only
one `NaiveBayesAlgorithm` is implemented by default, this `Seq` contains one
element.

In this case, `serve` simply returns the predicted result of the first, and the
only, algorithm, i.e. `predictedResults.head`.

Congratulations! You have just learned how to customize and build a
production-ready engine. Have fun!
